package com.wsjj.yjh;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class keyedReducingStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.socketTextStream("1.94.41.70",7777)
                .map(value -> {
                    String[] split = value.split(",");
                    return new WaterSensor(split[0],Long.valueOf(split[1]),Integer.valueOf(split[2]));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                            @Override
                            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                                return element.getTs();
                            }
                        }))
                .keyBy(value -> value.id)
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                             ReducingState<Integer> reducing;

                             @Override
                             public void open(Configuration parameters) throws Exception {
                                  reducing = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("reducing", new ReduceFunction<Integer>() {
                                     @Override
                                     public Integer reduce(Integer value1, Integer value2) throws Exception {
                                         return value1 + value2;
                                     }
                                 }, Types.INT));
                             }

                             @Override
                             public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
                                 reducing.add(value.getVc());



                                 out.collect("key的值："+value.id +",vc"+"的总和"+reducing.get().toString());
                             }
                         }
                ).print();

        env.execute();
    }
}
